交互机制概述#
Skills 与主代理的交互是 Claude Code 系统的核心。主代理负责协调和管理 Skills,而 Skills 则提供具体的功能实现。本节将深入探讨两者之间的交互机制。
交互模式#
1. 主动调用模式#
1.1 调用流程
主动调用流程#
步骤 1:任务识别#
主代理接收用户请求,分析任务类型和需求
步骤 2:Skill 选择#
根据任务需求,从可用 Skills 中选择最合适的 Skill
步骤 3:参数准备#
准备 Skill 需要的参数和上下文信息
步骤 4:Skill 调用#
调用选定的 Skill,传递参数和上下文
步骤 5:结果处理#
接收 Skill 的执行结果,进行必要的处理和整合
步骤 6:响应生成#
基于 Skill 的结果,生成最终的响应返回给用户
1.2 代码示例
bashpython class MainAgent: def __init__(self): self.skills = load_skills() self.context_manager = ContextManager() def process_request(self, user_request): # 1. 任务识别 task = self.analyze_task(user_request) # 2. Skill 选择 skill = self.select_skill(task) # 3. 参数准备 context = self.context_manager.collect_context(skill, task) parameters = self.prepare_parameters(task, context) # 4. Skill 调用 result = skill.execute(parameters, context) # 5. 结果处理 processed_result = self.process_result(result, context) # 6. 响应生成 response = self.generate_response(processed_result) return response ### 2. 被动调用模式 #### 2.1 调用流程 ## 被动调用流程 ### 步骤 1:用户指定 用户明确指定要使用的 Skill ### 步骤 2:参数验证 验证用户提供的参数是否有效 ### 步骤 3:上下文收集 收集 Skill 需要的上下文信息 ### 步骤 4:Skill 执行 执行指定的 Skill ### 步骤 5:结果返回 直接返回 Skill 的执行结果
2.2 代码示例
bashpython class MainAgent: def execute_skill(self, skill_name, user_parameters): # 1. 验证 Skill 存在 if skill_name not in self.skills: raise SkillNotFoundError(skill_name) skill = self.skills[skill_name] # 2. 参数验证 validated_params = skill.validate_parameters(user_parameters) # 3. 上下文收集 context = self.context_manager.collect_context(skill, validated_params) # 4. Skill 执行 result = skill.execute(validated_params, context) # 5. 结果返回 return result ### 3. 嵌套调用模式 #### 3.1 调用流程 ## 嵌套调用流程 ### 示例场景:部署应用 ### 调用层次 主代理 └─> 部署 Skill ├─> 测试 Skill │ └─> 代码分析 Skill │ └─> 文档检查 Skill ├─> 构建 Skill │ └─> 依赖检查 Skill └─> 验证 Skill └─> 健康检查 Skill ### 执行流程
3.2 代码示例
bashpython class DeploymentSkill(Skill): def execute(self, parameters, context): # 调用测试 Skill test_result = self.call_skill("test", context) if not test_result.success: return DeploymentResult(success=False, error="Tests failed") # 调用构建 Skill build_result = self.call_skill("build", context) if not build_result.success: return DeploymentResult(success=False, error="Build failed") # 执行部署 deploy_result = self.deploy(build_result.artifact) # 调用验证 Skill verify_result = self.call_skill("verify", context) return DeploymentResult( success=verify_result.success, deploy_result=deploy_result, verify_result=verify_result ) ## 通信机制 ### 1. 消息传递 #### 1.1 消息格式 ## 消息格式 ### 请求消息 ~~~`json `json { "message_id": "msg_123456", "timestamp": "2024-01-15T10:30:00Z", "type": "skill_request", "skill_name": "code-review", "parameters": { "file": "src/main.py", "strict": true
}, "context": { "project": {...}, "code": {...}, "user": {...} } }
bash~~~ json { "message_id": "msg_123456", "timestamp": "2024-01-15T10:30:15Z", "type": "skill_response", "status": "success", "result": { "issues": [...], "summary": {...} }, "metadata": { "execution_time": 15.2, "memory_used": "256MB" } } ### 错误消息 ~~~`json ````json { "message_id": "msg_123456", "timestamp": "2024-01-15T10:30:10Z", "type": "skill_error", "error": { "code": "FILE_NOT_FOUND", "message": "File src/main.py not found", "details": {...} } } ```> > ~~~ #### 1.2 消息队列 class MessageQueue: def __init__(self): self.queue = asyncio.Queue() self.handlers = {} async def send(self, message): await self.queue.put(message) async def receive(self): return await self.queue.get() def register_handler(self, message_type, handler): self.handlers[message_type] = handler async def process_messages(self): while True: message = await self.receive() handler = self.handlers.get(message.type) if handler: await handler(message) ~~~ ### 2. 事件驱动 #### 2.1 事件类型 ~~~ markdown ## 事件类型 ### Skill 事件 - skill_started: Skill 开始执行 - skill_progress: Skill 执行进度更新 - skill_completed: Skill 执行完成 - skill_failed: Skill 执行失败 ### 上下文事件 - context_updated: 上下文更新 - context_invalidated: 上下文失效 ### 工具事件 - tool_called: 工具被调用 - tool_completed: 工具执行完成 - tool_failed: 工具执行失败 #### 2.2 事件处理 class EventHandler: def __init__(self): self.listeners = defaultdict(list) def on(self, event_type, callback): self.listeners[event_type].append(callback) async def emit(self, event_type, data): for callback in self.listeners.get(event_type, []): await callback(data) async def handle_skill_started(self, event): print(f"Skill {event.skill_name} started") async def handle_skill_progress(self, event): print(f"Progress: {event.progress}%") async def handle_skill_completed(self, event): print(f"Skill {event.skill_name} completed") ~~~ ### 3. 流式通信 #### 3.1 流式输出 ~~~ python class StreamingSkill(Skill): async def execute_stream(self, parameters, context): # 步骤 1 yield {"step": 1, "message": "Analyzing code..."} result1 = await self.analyze_code(parameters, context) # 步骤 2 yield {"step": 2, "message": "Checking security..."} result2 = await self.check_security(result1, context) # 步骤 3 yield {"step": 3, "message": "Generating report..."} result3 = await self.generate_report(result2, context) # 最终结果 yield {"step": 4, "message": "Completed", "result": result3} #### 3.2 流式消费 async def consume_stream(skill, parameters, context): async for chunk in skill.execute_stream(parameters, context): if "message" in chunk: print(chunk["message"]) if "result" in chunk: return chunk["result"] ~~~ ## 状态管理 ### 1. 执行状态 #### 1.1 状态类型 ~~~ markdown ## 执行状态 ### 状态定义 - PENDING: 等待执行 - RUNNING: 正在执行 - PAUSED: 已暂停 - COMPLETED: 已完成 - FAILED: 执行失败 - CANCELLED: 已取消 ### 状态转换 PENDING → RUNNING → COMPLETED PENDING → RUNNING → FAILED RUNNING → PAUSED → RUNNING RUNNING → CANCELLED #### 1.2 状态管理 class ExecutionState: def __init__(self): self.state = "PENDING" self.start_time = None self.end_time = None self.progress = 0 self.error = None def start(self): self.state = "RUNNING" self.start_time = datetime.now() def complete(self): self.state = "COMPLETED" self.end_time = datetime.now() def fail(self, error): self.state = "FAILED" self.error = error self.end_time = datetime.now() def update_progress(self, progress): self.progress = progress def get_duration(self): if self.start_time and self.end_time: return (self.end_time - self.start_time).total_seconds() return None ~~~ ### 2. 上下文状态 #### 2.1 上下文快照 ~~~ python class ContextSnapshot: def __init__(self, context): self.timestamp = datetime.now() self.context = copy.deepcopy(context) self.version = self.generate_version() def generate_version(self): return hashlib.md5( json.dumps(self.context, sort_keys=True).encode() ).hexdigest() def compare(self, other_snapshot): return self.version == other_snapshot.version #### 2.2 上下文恢复 class ContextManager: def __init__(self): self.snapshots = [] self.current_context = {} def create_snapshot(self): snapshot = ContextSnapshot(self.current_context) self.snapshots.append(snapshot) return snapshot def restore_snapshot(self, snapshot): self.current_context = copy.deepcopy(snapshot.context) def rollback_to(self, version): for snapshot in reversed(self.snapshots): if snapshot.version == version: self.restore_snapshot(snapshot) return True return False ~~~ ### 3. 会话状态 #### 3.1 会话管理 ~~~ python class SessionManager: def __init__(self): self.sessions = {} self.current_session_id = None def create_session(self): session_id = generate_id() self.sessions[session_id] = { "id": session_id, "created_at": datetime.now(), "context": {}, "history": [], "state": "ACTIVE" } self.current_session_id = session_id return session_id def get_session(self, session_id): return self.sessions.get(session_id) def update_session(self, session_id, updates): if session_id in self.sessions: self.sessions[session_id].update(updates) def close_session(self, session_id): if session_id in self.sessions: self.sessions[session_id]["state"] = "CLOSED" self.sessions[session_id]["closed_at"] = datetime.now() ## 错误处理 ### 1. 错误传播 #### 1.1 错误类型 ## 错误类型 ### Skill 错误 - SkillNotFoundError: Skill 不存在 - SkillExecutionError: Skill 执行失败 - SkillTimeoutError: Skill 执行超时 ### 参数错误 - ParameterValidationError: 参数验证失败 - MissingParameterError: 缺少必需参数 - InvalidParameterError: 参数值无效 ### 上下文错误 - ContextNotFoundError: 上下文不存在 - ContextInvalidError: 上下文无效 - ContextTimeoutError: 上下文获取超时 ~~~ #### 1.2 错误处理策略 ~~~ python class ErrorHandler: def __init__(self): self.retries = {} self.fallbacks = {} def handle_error(self, error, context): error_type = type(error).__name__ # 检查是否应该重试 if self.should_retry(error_type): return self.retry(error, context) # 检查是否有回退方案 if self.has_fallback(error_type): return self.fallback(error, context) # 否则抛出错误 raise error def should_retry(self, error_type): return error_type in self.retries def retry(self, error, context): retry_config = self.retries[type(error).__name__] max_attempts = retry_config.get("max_attempts", 3) delay = retry_config.get("delay", 1) attempt = context.get("attempt", 0) + 1 if attempt < max_attempts: context["attempt"] = attempt time.sleep(delay * attempt) return "RETRY" return error def has_fallback(self, error_type): return error_type in self.fallbacks def fallback(self, error, context): fallback_func = self.fallbacks[type(error).__name__] return fallback_func(error, context) ### 2. 错误恢复 #### 2.1 恢复策略 ## 恢复策略 ### 自动恢复 - 重试机制 - 回退方案 - 降级处理 ### 手动恢复 - 用户确认 - 参数修正 - 上下文调整 ### 状态恢复 - 快照恢复 - 断点续传 - 事务回滚 ~~~ #### 2.2 恢复实现 ~~~ python class RecoveryManager: def __init__(self): self.checkpoints = {} def create_checkpoint(self, execution_id, state): self.checkpoints[execution_id] = { "timestamp": datetime.now(), "state": copy.deepcopy(state) } def restore_checkpoint(self, execution_id): if execution_id in self.checkpoints: return copy.deepcopy(self.checkpoints[execution_id]["state"]) return None def recover_from_error(self, error, execution_id): # 恢复到检查点 state = self.restore_checkpoint(execution_id) if state: # 尝试恢复执行 return self.resume_execution(state) # 如果没有检查点,尝试其他恢复策略 return self.alternative_recovery(error) ## 性能优化 ### 1. 并行执行 #### 1.1 并行策略 class ParallelExecutor: def __init__(self, max_workers=4): self.max_workers = max_workers self.executor = ThreadPoolExecutor(max_workers=max_workers) async def execute_parallel(self, tasks): futures = [] for task in tasks: future = self.executor.submit(task.execute) futures.append(future) results = [] for future in futures: result = await asyncio.wrap_future(future) results.append(result) return results ~~~ #### 1.2 依赖管理 ~~~ python class DependencyManager: def __init__(self): self.dependencies = {} def add_dependency(self, task, depends_on): if task not in self.dependencies: self.dependencies[task] = [] self.dependencies[task].extend(depends_on) def get_execution_order(self, tasks): order = [] visited = set() def visit(task): if task in visited: return visited.add(task) for dep in self.dependencies.get(task, []): visit(dep) order.append(task) for task in tasks: visit(task) return order ### 2. 资源管理 #### 2.1 资源池 class ResourcePool: def __init__(self, max_resources): self.max_resources = max_resources self.available = max_resources self.lock = asyncio.Lock() async def acquire(self): async with self.lock: while self.available <= 0: await asyncio.sleep(0.1) self.available -= 1 return True async def release(self): async with self.lock: self.available += 1 async def __aenter__(self): await self.acquire() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.release() ~~~ #### 2.2 资源监控 ~~~ python class ResourceMonitor: def __init__(self): self.metrics = defaultdict(list) def record_metric(self, name, value): self.metrics[name].append({ "value": value, "timestamp": datetime.now() }) def get_average(self, name): values = [m["value"] for m in self.metrics[name]] return sum(values) / len(values) if values else 0 def get_peak(self, name): values = [m["value"] for m in self.metrics[name]] return max(values) if values else 0
总结#
Skills 与主代理的交互机制是一个复杂而精密的系统,涉及多种交互模式、通信机制、状态管理、错误处理和性能优化。理解这些机制有助于:
- 优化性能:通过并行执行和资源管理提高性能
- 增强可靠性:通过完善的错误处理和恢复机制提高可靠性
- 改善体验:通过流式通信和事件驱动改善用户体验
- 支持扩展:通过灵活的交互机制支持功能扩展
在下一节中,我们将探讨 Skills 的性能优化策略,了解如何进一步提高 Skills 的执行效率。
bash~~~